Docker安装RocketMQ 4.8 |
您所在的位置:网站首页 › rocketmq 集群 docker › Docker安装RocketMQ 4.8 |
1拉取4.8镜像
docker pull foxiswho/rocketmq:4.8.0
拉取控制台镜像 docker pull styletang/rocketmq-console-ng 2创建rocketmq使用的共有网络,便于相互访问 docker network create rocketmq_network 3启动rmqnamesrv docker run -d --name rmqnamesrv --network rocketmq_network -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" -p 9876:9876 foxiswho/rocketmq:4.8.0 sh mqnamesrv–name rmqnamesrv:指定容器名称为rmqnamesrv,注意这个名字,后续会使用。 –network rocketmq_network:为容器指定网络为rocketmq_network,同一网络下的容器能够通过容器名称互通。 4启动rmqbroker创建映射日志文件夹 赋予权限 mkdir -p /data/docker/rocketmq/logs chmod 777 logs创建映射配置文件/data/docker/rocketmq/conf/broker.conf 编辑文件内容 broker.conf brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH mesrvAddr = 66.88.14.183:9876 brokerIP1 = 66.88.14.183 listenPort = 10911启动rmqbroker docker run -d --name rmqbroker --network rocketmq_network --privileged=true -v /data/docker/rocketmq/logs:/home/rocketmq/logs/rocketmqlogs -v /data/docker/rocketmq/conf/broker.conf:/home/rocketmq/rocketmq-4.8.0/conf/broker.conf -e "NAMESRV_ADDR=rmqnamesrv:9876" -e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" -p 10911:10911 -p 10912:10912 -p 10909:10909 foxiswho/rocketmq:4.8.0 sh mqbroker autoCreateTopicEnable=true -c /home/rocketmq/rocketmq-4.8.0/conf/broker.conf–privileged=true:如果使用-v映射了目录,则使用该参数获取文件访问权限 (容器之间可以通过容器名称链接) 验证容器之间是否互通:# 进入broker容器 docker exec -it rmqbroker /bin/bashping name-server的容器名称 ping rmqnamesrv 5启动rmqconsole docker run -d --name rmqconsole --network rocketmq_network --link rmqnamesrv:rmqnamesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8180:8080 -t styletang/rocketmq-console-ng访问8180端口 出现下图搭建成功 如果失败 进去rmqbroker容器中 查看下日志 并且查询是否 ping rmqnamesrv 通容器 引入pom依赖 org.apache.rocketmq rocketmq-client 4.8.0消费者代码 public class Consumer { public static void main(String[] args) throws MQClientException { /*消息接受*/ //1. 创建消息消费者, 指定消费者所属的组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group"); //2. 指定Nameserver地址 consumer.setNamesrvAddr("66.66.14.183:9876"); //3.指定消费者订阅的主题和标签 consumer.subscribe("broker-a", "myTag"); //4.设置回调函数,编写处理处理消息的方法 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { String s = new String(msg.getBody(), StandardCharsets.UTF_8); System.out.println(s); } //返回消费状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); } }生产者代码 public class Provider { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { /*消息发送*/ //1. 创建消息生产者, 指定生产者所属的组名 DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); //2. 指定Nameserver地址 producer.setNamesrvAddr("66.66.14.183:9876"); //3. 启动生产者 producer.start(); //4. 创建消息对象,指定主题、标签和消息体 String data = "holle word"; Message msg = new Message("broker-a", "myTag", data.getBytes()); //5. 发送消息 SendResult sendResult = producer.send(msg, 10000); System.out.println(sendResult); //6. 关闭生产者 producer.shutdown(); } }测试结果 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |